package org.example.rabbitmq.all;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author zfj
 * @date 2022/2/21 -11:44
 */
public class Consumer {
	private static Runnable runnable = () -> {
		// 1: 创建连接工厂
		ConnectionFactory connectionFactory = new ConnectionFactory();
		// 2: 设置连接属性
		connectionFactory.setHost("127.0.0.1");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		connectionFactory.setUsername("admin");
		connectionFactory.setPassword("admin");
		//获取队列的名称
		final String queueName = Thread.currentThread().getName();
		Connection connection = null;
		Channel channel = null;
		try {
			// 3: 从连接工厂中获取连接
			connection = connectionFactory.newConnection("生产者");
			// 4: 从连接中获取通道channel
			channel = connection.createChannel();
			// 5: 申明队列queue存储消息
			/*
			 *  如果队列不存在，则会创建
			 *  Rabbitmq不允许创建两个相同的队列名称，否则会报错。
			 *
			 *  @params1： queue 队列的名称
			 *  @params2： durable 队列是否持久化
			 *  @params3： exclusive 是否排他，即是否私有的，如果为true,会对当前队列加锁，其他的通道不能访问，并且连接自动关闭
			 *  @params4： autoDelete 是否自动删除，当最后一个消费者断开连接之后是否自动删除消息。
			 *  @params5： arguments 可以设置队列附加参数，设置队列的有效期，消息的最大长度，队列的消息生命周期等等。
			 * */
			// 这里如果queue已经被创建过一次了，可以不需要定义
			//channel.queueDeclare("queue1", false, false, false, null);
			// 6： 定义接受消息的回调
			Channel finalChannel = channel;
			finalChannel.basicConsume(queueName, true, new DeliverCallback() {
				@Override
				public void handle(String s, Delivery delivery) throws IOException {
					System.out.println(queueName + "：收到消息是：" + new String(delivery.getBody(), "UTF-8"));
				}
			}, new CancelCallback() {
				@Override
				public void handle(String s) throws IOException {
				}
			});
			System.out.println(queueName + "：开始接受消息");
			System.in.read();
		} catch (Exception ex) {
			ex.printStackTrace();
			System.out.println("发送消息出现异常...");
		} finally {
			// 7: 释放连接关闭通道
			if (channel != null && channel.isOpen()) {
				try {
					channel.close();
				} catch (Exception ex) {
					ex.printStackTrace();
				}
			}
			if (connection != null && connection.isOpen()) {
				try {
					connection.close();
				} catch (Exception ex) {
					ex.printStackTrace();
				}
			}
		}
	};
	public static void main(String[] args) {
		// 启动三个线程去执行
		new Thread(runnable, "queue4").start();
		new Thread(runnable, "queue5").start();
		new Thread(runnable, "queue6").start();
	}
}

